package cn.galudisu.spark._2_loadveritydata

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructField, StructType, _}

import scala.io.Source

/**
  * 从JSON文件读取数据
  */
object DataFrameFromJSON extends App {

  val conf       = new SparkConf().setAppName("DataFromJSON").setMaster("local[2]")
  val sc         = SparkSession.builder().config(conf = conf).getOrCreate().sparkContext
  val sqlContext = SparkSession.builder().config(conf = conf).getOrCreate().sqlContext

  val dFrame = sqlContext.read.json("json/profiles.json")

  //val dFrame = sqlContext.read.json("hdfs://localhost:9000/data/scalada/profiles.json")
  dFrame.printSchema()
  dFrame.show()

  //Using JSONRDD
  //val strRDD = sc.textFile("hdfs://localhost:9000/data/scalada/profiles.json")
  val strRDD = sc.textFile("json/profiles.json")
  val jsonDf = sqlContext.read.json(strRDD)

  jsonDf.printSchema()
  jsonDf.show()

  //Explicit Schema Definition
  val profilesSchema = StructType(
    Seq(
      StructField("_id", StringType, nullable = true),
      StructField("about", StringType, nullable = true),
      StructField("address", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("company", StringType, nullable = true),
      StructField("email", StringType, nullable = true),
      StructField("eyeColor", StringType, nullable = true),
      StructField("favoriteFruit", StringType, nullable = true),
      StructField("gender", StringType, nullable = true),
      StructField("name", StringType, nullable = true),
      StructField("phone", StringType, nullable = true),
      StructField("registered", TimestampType, nullable = true),
      StructField("tags", ArrayType(StringType), nullable = true)))

  val jsonDfWithSchema = sqlContext.read.schema(profilesSchema).json(strRDD)

  jsonDfWithSchema.printSchema() //Has timestamp
  jsonDfWithSchema.show()

  jsonDfWithSchema.createOrReplaceTempView("profilesTable")

  //Filter based on timestamp
  val filterCount = sqlContext.sql("select * from profilesTable where registered> CAST('2014-08-26 00:00:00' AS " +
    "TIMESTAMP)").count

  val fullCount = sqlContext.sql("select * from profilesTable").count

  println("All Records Count : " + fullCount) //200
  println("Filtered based on timestamp count : " + filterCount) //106

  //Writes schema as JSON to file
  // File("profileSchema.json").writeAll(profilesSchema.json)

  // 从文件中读取Schema
  val loadedSchema = DataType.fromJson(Source.fromFile("json/profileSchema.json").mkString)

  println("ProfileSchema == loadedSchema :" + (loadedSchema.json == profilesSchema.json))
  //Print loaded schema
  println(loadedSchema.prettyJson)

}